package org.qy.rocketmq.cs.rocket.producer;

import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.qy.rocketmq.cs.enums.ErrorCode;
import org.qy.rocketmq.cs.exception.BadRequestException;
import org.qy.rocketmq.cs.exception.MQRuntimeException;
import org.qy.rocketmq.cs.rocket.MQExecutionProcessor;
import org.qy.rocketmq.cs.rocket.MQFunction;
import org.qy.rocketmq.cs.rocket.listener.ProducerTransactionListener;
import org.qy.rocketmq.cs.utils.JsonUtil;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

/**
 * All rights Reserved, Designed By www.cu-sc.com
 *
 * @Title:  ProducerTransaction.java.java
 * @Package org.qy.rocketmq.cs.rocket.producer
 * @Description: 消息生产者
 * @Since: JDK 1.8
 * @Author: fangyukang
 * @Email: fangyk@cu-sc.com
 * @Version: v1.0.0
 * @Date: 2020/7/1 22:18
 * @Copyright: 2020 www.cu-sc.com All rights reserved. <br/>
 * 注意：本内容仅限于联通集团内部传阅，禁止外泄以及用于其他的商业目的<br/>
 */
@Data
@EqualsAndHashCode(callSuper = true)
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "rocketmq.transaction-producer")
public class ProducerTransaction extends TransactionMQProducer
{
    @Resource
    ProducerTransactionListener producerTransactionListener;

    @Override
    public String toString()
    {
        return "ProducerTransaction{" +
                "} " + super.toString();
    }

    public ProducerTransaction()
    {
        super();
        log.info("ProducerTransaction|生产者事务对象参数设置");
        this.setVipChannelEnabled(false);
        this.setRetryTimesWhenSendAsyncFailed(10);
    }

    /**
     * @Function: ProducerTransaction.java
     * @Description: 使用事物模式发送
     * @params:
     * @return:
     * @throws: 异常描述
     * @version: v1.0.0
     * @author: fangyukang
     * @date: 2020/7/1 22:19
     * 注意:本内容仅限于联通智网科技有限公司内部传阅,禁止外泄以及用于其他的商业目
     * Modification History:
     * Date              Author          Version         Description
     * 2020/7/1        fangyukang          v1.0.0          修改原因
     */
    public void send(String topic, String tags, Object o)
    {
        this.send(topic, tags, o, null);
    }

    /**
     * @Function: ProducerTransaction.java
     * @Description: 使用事物模式发送
     * @params:
     * @return:
     * @throws: 异常描述
     * @version: v1.0.0
     * @author: fangyukang
     * @date: 2020/7/1 22:19
     * 注意:本内容仅限于联通智网科技有限公司内部传阅,禁止外泄以及用于其他的商业目
     * Modification History:
     * Date              Author          Version         Description
     * 2020/7/1        fangyukang          v1.0.0          修改原因
     */
    public void send(String topic, String tags, Object o, MQFunction fun)
    {
        Message msg = new Message();
        msg.setTopic(topic);
        msg.setTags(tags);
        msg.setBody(JsonUtil.stringify(o).getBytes(StandardCharsets.UTF_8));
        this.send(msg, fun);
    }

    /**
     * @Function: ProducerTransaction.java
     * @Description: 使用事物模式发送
     * @params:
     * @return:
     * @throws: 异常描述
     * @version: v1.0.0
     * @author: fangyukang
     * @date: 2020/7/1 22:20
     * 注意:本内容仅限于联通智网科技有限公司内部传阅,禁止外泄以及用于其他的商业目
     * Modification History:
     * Date              Author          Version         Description
     * 2020/7/1        fangyukang          v1.0.0          修改原因
     */
    public void send(Message msg, MQFunction fun)
    {
        // 创建执行器
        MQExecutionProcessor executionProcessor = new MQExecutionProcessor(fun);
        try {
            // sendMessageInTransaction 事物消息不支持延时消息
            SendResult result = super.sendMessageInTransaction(msg, executionProcessor);
            log.info("ProducerTransaction|send|使用事物模式发送消息|返回出参：={}", JSONObject.toJSONString(result));
            if(result.getSendStatus() != SendStatus.SEND_OK)
            {
                throw new MQRuntimeException(String.format("mq transaction send error sendStatus:%s", result.getSendStatus().name()));
            }
        }
        catch (Exception e) {
            log.error("ProducerTransaction|send|使用事物模式发送消息异常".concat(e.getLocalizedMessage()), e);
            throw new BadRequestException(ErrorCode.SERVER_ERROR);
        }
        // 判断 MQFunction 是否执行失败，执行失败直接返回异常
        if(!executionProcessor.getResult()) {
            log.info("ProducerTransaction|send|使用事物模式发送消息失败");
            throw executionProcessor.getE();
        }
    }

    @PostConstruct
    @Override
    public void start(){
        try {
            Assert.notNull(producerTransactionListener, "rocketmq 生产 producerTransactionListener 不能为空，请实现IProducerTransactionListener接口类");
            this.setTransactionListener(producerTransactionListener);
            super.start();
            log.info("ProducerTransaction|start|rocket mq producerTransaction server开启成功");
        } catch (MQClientException e) {
            log.error("ProducerTransaction|start|rocket mq producerTransaction server启动失败", e);
        }
    }

    @PreDestroy
    @Override
    public void shutdown()
    {
        super.shutdown();
        log.info("ProducerTransaction|start|rocket mq producerTransaction server关闭成功");
    }
}
